热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术|使用FlinkSQL传输市场数据1:传输VWAP

FlinkSQL是一种数据处理语言,用于事件驱动和流应用程序的快速原型设计和开发。他将SQL的简单性和可访问性与Flink的性能和可伸缩性结合在一起。借助FlinkSQL,大家都可

点击Cloudera中国 即可订阅!

本文是一个由多部分组成的系列文章的第一篇,展示了 FlinkSQL 应用于市场数据的强大功能和可表达性。该系列的代码和数据可在 github 上获得。它由量化建模负责人 Simudyne 和 Krishnen Vytelingum 合着。

速度在金融市场上至关重要。无论目标是最大化 alpha 还是最大程度地减少风险,金融技术人员都会投入大量资金,以获取有关市场状况以及行情的最新见解。事件驱动和流式处理体系结构可在事件发生时对事件进行复杂的处理,使其很自然地适合金融市场应用。


Flink SQL 是一种数据处理语言,可用于事件驱动和流应用程序的快速原型设计和开发。Flink SQL 将 SQL 的简单性和可访问性与 Apache Flink(一种流行的分布式流媒体平台)的性能和可伸缩性结合在一起。借助 Flink SQL,业务分析人员、开发人员和量化人员都可以快速建立流传输管道,以实时执行复杂的数据分析。


在本文中,我们将使用 Simudyne 开发的基于代理的模型(ABM)生成的综合市场数据。ABM 并不是自上而下的方法,而是在复杂系统中对自主参与者(或代理)进行建模,例如:金融市场中的各种买卖双方。可以捕获这些交互,并可以针对许多应用程序分析生成的综合数据集,例如用于检测紧急欺诈行为的训练模型,或探索风险管理的“假设”场景。ABM 生成的综合数据在历史数据不足或不可用的情况下很有用。


流式 VWAP

我们从一个简单的示例开始,该示例从一系列交易事件中计算成交量加权平均价格(VWAP)。VWAP 是交易中用来衡量证券的市场价格和未来方向的通用基准。在这里,我们有一个 CSV 格式的数据集,该数据集显示了一个交易日(2020年10月22日)的虚构证券(SIMUI)的交易事件。 

    sym,prc,vol,bid_id,ask_id,buyer_id,seller_id,step,time
    SIMUl,149.86,2300,P|63-m-1,P|66-l-0,P|63,P|66,380,22-Oct-2020 08:00:07.600
    SIMUl,149.86,1935,P|63-m-1,P|25-l-0,P|63,P|25,380,22-Oct-2020 08:00:07.600
    SIMUl,149.74,582,P|18-l-0,P|98-m-0,P|18,P|98,428,22-Oct-2020 08:00:08.560
    SIMUl,149.76,2475,P|27-l-0,P|42-m-1,P|27,P|42,1021,22-Oct-2020 08:00:20.420
    SIMUl,149.84,21,P|5-m-0,P|42-l-0,P|5,P|42,1078,22-Oct-2020 08:00:21.560
    SIMUl,149.76,2709,P|24-l-1,P|92-m-0,P|24,P|92,1200,22-Oct-2020 08:00:24.000
    SIMUl,149.84,1653,P|8-m-1,P|24-l-0,P|8,P|24,1513,22-Oct-2020 08:00:30.260
    SIMUl,149.84,400,P|19-m-0,P|24-l-0,P|19,P|24,1577,22-Oct-2020 08:00:31.540

    这些列是:交易品种,价格,数量,出价 ID,要价 ID,买方 ID,卖方 ID,步骤和时间戳。步骤列是离散步骤 ABM 市场模拟的伪像,出于我们的目的可以忽略;其余各栏不言自明。


    要处理此数据,我们需要通过发出 CREATE TABLE 语句来声明 Flink SQL 表。我们的示例数据是基于文件系统的,但是可以轻松更改连接器类型以从其他来源(例如Kafka主题)读取数据。请注意,event_time 是派生的列,也用于水印。通过加水印,Flink 可以限制等待延迟到达和故障事件的时间,以便可以取得进展。在这里,我们声明,到达 event_time 超过水印一分钟以上的记录将被忽略。

      CREATE TABLE trades (
      symbol STRING,
      price DOUBLE,
      vol INT,
      bid_id STRING,
      ask_id STRING,
      buyer_id STRING,
      seller_id STRING,
      step INT,
      ts_str STRING,
      event_time AS TO_TIMESTAMP (ts_str, 'dd-MMM-yyyy HH:mm:ss.SSS'),
      WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
      ) WITH (
      'connector' = 'filesystem',
      'path' = '/path/to/varstream/data/trades_raw',
      'format' = 'csv'
      );

      VWAP 的公式很简单:对于指定时间段内的每笔交易,将价格乘以交易股份数即可。将其总和除以该时间段内已交易的股票总数。下面的流查询将显示当前的 VWAP,它将随着新交易事件的到来而更新:

        SELECT
        symbol,
        SUM (vol)                     AS cumulative_volume,
        SUM (price * vol)             AS cumulative_pv,
        SUM (price * vol) / SUM (vol) AS vwap
        FROM
        trades
        GROUP BY
        symbol
        ;


        实时播放

        由于 CSV 文件中一个符号中只有一天的数据价值,因此结果更新可能发生得太快了,您几乎没有注意到。从源读取事件的速度比实时发生的速度要快。有时需要在准实时回放历史数据,就好像 Flink 现在正在接收历史事件数据(例如,用于演示或原型设计和开发过程中)。


        为了解决这个问题,我们提供了一个简单的 UDTF(用户定义的表函数),该数据以从行时间戳派生的人工延迟播放历史数据。UDTF 有两个参数:第二个参数指定行时间戳(在我们的示例中为 event_time ),而第一个参数指定第一个行时间戳之后的分钟持续时间(以分钟为单位),以开始应用延迟。以下代码段显示了如何注册 UDTF 并在处理事件的前120分钟后将其用于视图中以应用延迟。请注意 LATERAL TABLE 联接的使用,该联接将函数应用于主表中的每一行。

          -- Register UDTF
          CREATE FUNCTION replay_after AS 'varstream.ReplayAfterFunction' LANGUAGE JAVA ;
          -- Create a view
          CREATE VIEW trades_replay AS (
          SELECT * FROM trades
          LEFT JOIN LATERAL TABLE (replay_after (120, trades.event_time)) ON TRUE
          ) ;

          您可以通过发出一个简单的查询来验证事件的重播方式:

          SELECT * FROM trades_replay

          使用此视图,我们现在可以发出相同的 VWAP 聚合查询,并观察对 VWAP 的流更新,就好像它们是实时发生的一样:

            SELECT
            symbol,
            SUM (vol) AS cumulative_volume,
            SUM (price * vol) AS cumulative_pv,
            SUM (price * vol) SUM (vol) AS vwap
            FROM
            trades_replay
            GROUP BY
            symbol
            ;

            尽管此 UDTF 在进行原型制作时非常有用,但从根本上没有打算把它用于生产用途。我们在这里使用它只是为了演示 FlinkSQL 如何在事件以模拟实时到达时更新聚合结果。


            Group Windows

            前面的示例显示了如何计算当天的流式 VWAP。假设您要以每隔1分钟的时间建立一个带有蜡烛图的交易仪表板。您可能需要计算每分钟的 VWAP、高价、低价和总体积。Flink SQL 通过组窗口使此操作变得容易,组窗口可以在 GROUP BY 时间间隔上应用聚合函数。


            下面显示了如何获取每分钟的 VWAP:

              CREATE VIEW vwap_1m AS (
              SELECT
              symbol,
              TUMBLE_START (event_time, INTERVAL '1' MINUTES) AS start_time,
              TUMBLE_ROWTIME (event_time, INTERVAL '1' MINUTES) AS row_time,
              MAX (price) AS max_price,
              MIN (price) AS min_price,
              SUM (price * vol) AS total_price,
              SUM (vol) AS total_vol,
              SUM (price * vol) SUM (vol) AS vwap
              FROM
              trades
              GROUP BY
              TUMBLE (event_time, INTERVAL '1' MINUTES), symbol
              );
              SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_1m ;

              前面的操作为每分钟内发生的交易计算了 VWAP。如果要在几分钟内计算移动的 VWAP(MVWAP),则 Flink SQL 提供了一个跳跃的组窗口。下面显示了5分钟的移动 VWAP,步长为1分钟。

                CREATE VIEW vwap_5m AS (
                SELECT
                symbol,
                HOP_START (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS start_time,
                HOP_ROWTIME (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES) AS row_time,
                MAX (price) AS max_price,
                MIN (price) AS min_price,
                SUM (price * vol) AS total_price,
                SUM (vol) AS total_vol,
                SUM (price * vol) / SUM (vol) AS vwap
                FROM
                trades
                GROUP BY
                HOP (event_time, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES), symbol
                );
                SELECT symbol, start_time, total_price, total_vol, vwap FROM vwap_5m ;

                结论

                Flink SQL 可以极大地简化和加快流数据流的开发。在本文中,我们探索了 SQL GROUP BY 子句的不同用法,以根据市场数据流计算 VWAP 的变化。在下一部分中,我们将向您展示如何从市场数据中提取每分钟的流式采样,以计算日内风险价值(IVaR)。我们希望本系列文章能鼓励您尝试将 Flink SQL 用于流式市场数据应用程序。

                原文作者

                Patrick Angeles& Krishnen Vytelingum

                原文链接

                https://blog.cloudera.com/streaming-market-data-with-flink-sql-part-i-streaming-vwap/


                Cloudera中国

                更多资讯,点击阅读原文

                长按扫码关注我们




                推荐阅读
                • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
                • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
                • 本文讨论了在使用Git进行版本控制时,如何提供类似CVS中自动增加版本号的功能。作者介绍了Git中的其他版本表示方式,如git describe命令,并提供了使用这些表示方式来确定文件更新情况的示例。此外,文章还介绍了启用$Id:$功能的方法,并讨论了一些开发者在使用Git时的需求和使用场景。 ... [详细]
                • 前言本篇为大家总结社区多人合作常见的场景和对应的git操作命令。本篇非新手教程,阅读本篇前需具备Git基础知识。Git入门教程请参考https://www ... [详细]
                • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
                • svnWebUI:一款现代化的svn服务端管理软件
                  svnWebUI是一款图形化管理服务端Subversion的配置工具,适用于非程序员使用。它解决了svn用户和权限配置繁琐且不便的问题,提供了现代化的web界面,让svn服务端管理变得轻松。演示地址:http://svn.nginxwebui.cn:6060。 ... [详细]
                • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
                • Java如何导入和导出Excel文件的方法和步骤详解
                  本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
                • 像跟踪分布式服务调用那样跟踪Go函数调用链 | Gopher Daily (2020.12.07) ʕ◔ϖ◔ʔ
                  每日一谚:“Acacheisjustamemoryleakyouhaven’tmetyet.”—Mr.RogersGo技术专栏“改善Go语⾔编程质量的50个有效实践” ... [详细]
                • ZooKeeper 学习
                  前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
                • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
                • 如何在mysqlshell命令中执行sql命令行本文介绍MySQL8.0shell子模块Util的两个导入特性importTableimport_table(JS和python版本 ... [详细]
                • 简介数组、CSV、表格、东西将一个数组转化为逗号为支解符的字符串(CSV)即表格数据。该源码来自于https:30secondsofcode.orgconstarrayToCSV( ... [详细]
                • 1.淘宝模拟登录2.天猫商品数据爬虫3.爬取淘宝我已购买的宝贝数据4.每天不同时间段通过微信发消息提醒女友5.爬取5K分辨率超清唯美壁纸6.爬取豆瓣排行榜电影数据(含GUI界面版) ... [详细]
                • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
                  报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
                author-avatar
                HikariFocus_695
                这个家伙很懒,什么也没留下!
                PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有